Using dask.distributed as Parallel Pool Example

This example shows how you can use any parallel pool-executor like object to run combos - specifally the dask.distributed.Client.


In [1]:
import xyzpy as xyz
import numpy as np
from dask.distributed import Client

First we instantiate the client:


In [2]:
address = None  # put in an actual address like '127.0.0.1:8786' if you have a scheduler running
c = Client(address=address)  
c


Out[2]:

Client

Cluster

  • Workers: 8
  • Cores: 8
  • Memory: 16.68 GB

The Client instance can act like a normal parallel pool since it has a submit method and returns futures:


In [3]:
c.submit


Out[3]:
<bound method Client.submit of <Client: scheduler='tcp://127.0.0.1:43043' processes=8 cores=8>>

Let's define our test function and combos:


In [4]:
def rad(x, y, n):
    r = (x**n + y**n)**(1 / n)
    if r == 0.0:
        return 1
    return np.sin(r) / r

combos = {
    'x': np.linspace(-15, 15, 51),
    'y': np.linspace(-15, 15, 51),
    'n': [2, 4, 6],
}

r = xyz.Runner(rad, var_names='sinc2d')

Now we can run (or harvest, or grow_missing) our combos, supplying the Client instance to the executor= keyword:


In [5]:
r.run_combos(combos, executor=c)


100%|##########| 7803/7803 [00:23<00:00, 330.19it/s]
Out[5]:
<xarray.Dataset>
Dimensions:  (n: 3, x: 51, y: 51)
Coordinates:
  * x        (x) float64 -15.0 -14.4 -13.8 -13.2 -12.6 -12.0 -11.4 -10.8 ...
  * y        (y) float64 -15.0 -14.4 -13.8 -13.2 -12.6 -12.0 -11.4 -10.8 ...
  * n        (n) int64 2 4 6
Data variables:
    sinc2d   (x, y, n) float64 0.03308 -0.04752 -0.05369 0.04479 -0.05587 ...

That should take enough time to check out client status including the tasks being processed on for example: http://127.0.0.1:8787/tasks.

Finally, visualize the output:


In [6]:
r.last_ds.xyz.iheatmap('x', 'y', 'sinc2d', col='n')


Loading BokehJS ...